From: Ben Pfaff Date: Thu, 12 Jul 2012 21:18:05 +0000 (-0700) Subject: ofproto: New feature to notify controllers of flow table changes. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2b07c8b182b76e4e3a162796d3ae273ef51d4131;p=openvswitch ofproto: New feature to notify controllers of flow table changes. OpenFlow switching monitoring and controller coordination can be made more efficient if the switch can notify a controller of flow table changes as they occur, rather than periodically polling for changes. This commit implements such a feature. Feature #6633. CC: Natasha Gude Signed-off-by: Ben Pfaff --- diff --git a/NEWS b/NEWS index f5190c0f..5b9db40d 100644 --- a/NEWS +++ b/NEWS @@ -22,6 +22,8 @@ post-v1.7.0 queue does not exist, or for requests for a specific queue on all ports, if the specified queue does not exist on any port. (Previous versions generally reported an empty set of results.) + - New "flow monitor" feature to allow controllers to be notified of + flow table changes as they happen. - Additional protocols are not mirrored and dropped when forward-bpdu is false. For a full list, see the ovs-vswitchd.conf.db man page. - Open vSwitch now sends RARP packets in situations where it previously diff --git a/include/openflow/nicira-ext.h b/include/openflow/nicira-ext.h index 82deeb0f..1104dbf8 100644 --- a/include/openflow/nicira-ext.h +++ b/include/openflow/nicira-ext.h @@ -117,6 +117,11 @@ enum nicira_type { NXT_SET_ASYNC_CONFIG = 19, /* struct nx_async_config. */ NXT_SET_CONTROLLER_ID = 20, /* struct nx_controller_id. */ + + /* Flow table monitoring (see also NXST_FLOW_MONITOR). */ + NXT_FLOW_MONITOR_CANCEL = 21, /* struct nx_flow_monitor_cancel. */ + NXT_FLOW_MONITOR_PAUSED = 22, /* struct nicira_header. */ + NXT_FLOW_MONITOR_RESUMED = 23, /* struct nicira_header. */ }; /* Header for Nicira vendor stats request and reply messages. */ @@ -131,7 +136,10 @@ OFP_ASSERT(sizeof(struct nicira_stats_msg) == 24); enum nicira_stats_type { /* Flexible flow specification (aka NXM = Nicira Extended Match). */ NXST_FLOW, /* Analogous to OFPST_FLOW. */ - NXST_AGGREGATE /* Analogous to OFPST_AGGREGATE. */ + NXST_AGGREGATE, /* Analogous to OFPST_AGGREGATE. */ + + /* Flow table monitoring. */ + NXST_FLOW_MONITOR, }; /* Fields to use when hashing flows. */ @@ -1976,5 +1984,240 @@ struct nx_action_controller { uint8_t zero; /* Must be zero. */ }; OFP_ASSERT(sizeof(struct nx_action_controller) == 16); + +/* Flow Table Monitoring + * ===================== + * + * NXST_FLOW_MONITOR allows a controller to keep track of changes to OpenFlow + * flow table(s) or subsets of them, with the following workflow: + * + * 1. The controller sends an NXST_FLOW_MONITOR request to begin monitoring + * flows. The 'id' in the request must be unique among all monitors that + * the controller has started and not yet canceled on this OpenFlow + * connection. + * + * 2. The switch responds with an NXST_FLOW_MONITOR reply. If the request's + * 'flags' included NXFMF_INITIAL, the reply includes all the flows that + * matched the request at the time of the request (with event NXFME_ADDED). + * If 'flags' did not include NXFMF_INITIAL, the reply is empty. + * + * The reply uses the xid of the request (as do all replies to OpenFlow + * requests). + * + * 3. Whenever a change to a flow table entry matches some outstanding monitor + * request's criteria and flags, the switch sends a notification to the + * controller as an additional NXST_FLOW_MONITOR reply with xid 0. + * + * When multiple outstanding monitors match a single change, only a single + * notification is sent. This merged notification includes the information + * requested in any of the individual monitors. That is, if any of the + * matching monitors requests actions (NXFMF_ACTIONS), the notification + * includes actions, and if any of the monitors request full changes for the + * controller's own changes (NXFMF_OWN), the controller's own changes will + * be included in full. + * + * 4. The controller may cancel a monitor with NXT_FLOW_MONITOR_CANCEL. No + * further notifications will be sent on the basis of the canceled monitor + * afterward. + * + * + * Buffer Management + * ================= + * + * OpenFlow messages for flow monitor notifications can overflow the buffer + * space available to the switch, either temporarily (e.g. due to network + * conditions slowing OpenFlow traffic) or more permanently (e.g. the sustained + * rate of flow table change exceeds the network bandwidth between switch and + * controller). + * + * When Open vSwitch's notification buffer space reaches a limiting threshold, + * OVS reacts as follows: + * + * 1. OVS sends an NXT_FLOW_MONITOR_PAUSED message to the controller, following + * all the already queued notifications. After it receives this message, + * the controller knows that its view of the flow table, as represented by + * flow monitor notifications, is incomplete. + * + * 2. As long as the notification buffer is not empty: + * + * - NXMFE_ADD and NXFME_MODIFIED notifications will not be sent. + * + * - NXFME_DELETED notifications will still be sent, but only for flows + * that existed before OVS sent NXT_FLOW_MONITOR_PAUSED. + * + * - NXFME_ABBREV notifications will not be sent. They are treated as + * the expanded version (and therefore only the NXFME_DELETED + * components, if any, are sent). + * + * 3. When the notification buffer empties, OVS sends NXFME_ADD notifications + * for flows added since the buffer reached its limit and NXFME_MODIFIED + * notifications for flows that existed before the limit was reached and + * changed after the limit was reached. + * + * 4. OVS sends an NXT_FLOW_MONITOR_RESUMED message to the controller. After + * it receives this message, the controller knows that its view of the flow + * table, as represented by flow monitor notifications, is again complete. + * + * This allows the maximum buffer space requirement for notifications to be + * bounded by the limit plus the maximum number of supported flows. + * + * + * "Flow Removed" messages + * ======================= + * + * The flow monitor mechanism is independent of OFPT_FLOW_REMOVED and + * NXT_FLOW_REMOVED. Flow monitor updates for deletion are sent if + * NXFMF_DELETE is set on a monitor, regardless of whether the + * OFPFF_SEND_FLOW_REM flag was set when the flow was added. */ + +/* NXST_FLOW_MONITOR request. + * + * The NXST_FLOW_MONITOR request's body consists of an array of zero or more + * instances of this structure. The request arranges to monitor the flows + * that match the specified criteria, which are interpreted in the same way as + * for NXST_FLOW. + * + * 'id' identifies a particular monitor for the purpose of allowing it to be + * canceled later with NXT_FLOW_MONITOR_CANCEL. 'id' must be unique among + * existing monitors that have not already been canceled. + * + * The reply includes the initial flow matches for monitors that have the + * NXFMF_INITIAL flag set. No single flow will be included in the reply more + * than once, even if more than one requested monitor matches that flow. The + * reply will be empty if none of the monitors has NXFMF_INITIAL set or if none + * of the monitors initially matches any flows. + * + * For NXFMF_ADD, an event will be reported if 'out_port' matches against the + * actions of the flow being added or, for a flow that is replacing an existing + * flow, if 'out_port' matches against the actions of the flow being replaced. + * For NXFMF_DELETE, 'out_port' matches against the actions of a flow being + * deleted. For NXFMF_MODIFY, an event will be reported if 'out_port' matches + * either the old or the new actions. */ +struct nx_flow_monitor_request { + ovs_be32 id; /* Controller-assigned ID for this monitor. */ + ovs_be16 flags; /* NXFMF_*. */ + ovs_be16 out_port; /* Required output port, if not OFPP_NONE. */ + ovs_be16 match_len; /* Length of nx_match. */ + uint8_t table_id; /* One table's ID or 0xff for all tables. */ + uint8_t zeros[5]; /* Align to 64 bits (must be zero). */ + /* Followed by: + * - Exactly match_len (possibly 0) bytes containing the nx_match, then + * - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of + * all-zero bytes. */ +}; +OFP_ASSERT(sizeof(struct nx_flow_monitor_request) == 16); + +/* 'flags' bits in struct nx_flow_monitor_request. */ +enum nx_flow_monitor_flags { + /* When to send updates. */ + NXFMF_INITIAL = 1 << 0, /* Initially matching flows. */ + NXFMF_ADD = 1 << 1, /* New matching flows as they are added. */ + NXFMF_DELETE = 1 << 2, /* Old matching flows as they are removed. */ + NXFMF_MODIFY = 1 << 3, /* Matching flows as they are changed. */ + + /* What to include in updates. */ + NXFMF_ACTIONS = 1 << 4, /* If set, actions are included. */ + NXFMF_OWN = 1 << 5, /* If set, include own changes in full. */ +}; + +/* NXST_FLOW_MONITOR reply header. + * + * The body of an NXST_FLOW_MONITOR reply is an array of variable-length + * structures, each of which begins with this header. The 'length' member may + * be used to traverse the array, and the 'event' member may be used to + * determine the particular structure. + * + * Every instance is a multiple of 8 bytes long. */ +struct nx_flow_update_header { + ovs_be16 length; /* Length of this entry. */ + ovs_be16 event; /* One of NXFME_*. */ + /* ...other data depending on 'event'... */ +}; +OFP_ASSERT(sizeof(struct nx_flow_update_header) == 4); + +/* 'event' values in struct nx_flow_update_header. */ +enum nx_flow_update_event { + /* struct nx_flow_update_full. */ + NXFME_ADDED = 0, /* Flow was added. */ + NXFME_DELETED = 1, /* Flow was deleted. */ + NXFME_MODIFIED = 2, /* Flow (generally its actions) was changed. */ + + /* struct nx_flow_update_abbrev. */ + NXFME_ABBREV = 3, /* Abbreviated reply. */ +}; + +/* NXST_FLOW_MONITOR reply for NXFME_ADDED, NXFME_DELETED, and + * NXFME_MODIFIED. */ +struct nx_flow_update_full { + ovs_be16 length; /* Length is 24. */ + ovs_be16 event; /* One of NXFME_*. */ + ovs_be16 reason; /* OFPRR_* for NXFME_DELETED, else zero. */ + ovs_be16 priority; /* Priority of the entry. */ + ovs_be16 idle_timeout; /* Number of seconds idle before expiration. */ + ovs_be16 hard_timeout; /* Number of seconds before expiration. */ + ovs_be16 match_len; /* Length of nx_match. */ + uint8_t table_id; /* ID of flow's table. */ + uint8_t pad; /* Reserved, currently zeroed. */ + ovs_be64 cookie; /* Opaque controller-issued identifier. */ + /* Followed by: + * - Exactly match_len (possibly 0) bytes containing the nx_match, then + * - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of + * all-zero bytes, then + * - Actions to fill out the remainder 'length' bytes (always a multiple + * of 8). If NXFMF_ACTIONS was not specified, or 'event' is + * NXFME_DELETED, no actions are included. + */ +}; +OFP_ASSERT(sizeof(struct nx_flow_update_full) == 24); + +/* NXST_FLOW_MONITOR reply for NXFME_ABBREV. + * + * When the controller does not specify NXFMF_OWN in a monitor request, any + * flow tables changes due to the controller's own requests (on the same + * OpenFlow channel) will be abbreviated, when possible, to this form, which + * simply specifies the 'xid' of the OpenFlow request (e.g. an OFPT_FLOW_MOD or + * NXT_FLOW_MOD) that caused the change. + * + * Some changes cannot be abbreviated and will be sent in full: + * + * - Changes that only partially succeed. This can happen if, for example, + * a flow_mod with type OFPFC_MODIFY affects multiple flows, but only some + * of those modifications succeed (e.g. due to hardware limitations). + * + * This cannot occur with the current implementation of the Open vSwitch + * software datapath. It could happen with other datapath implementations. + * + * - Changes that race with conflicting changes made by other controllers or + * other flow_mods (not separated by barriers) by the same controller. + * + * This cannot occur with the current Open vSwitch implementation + * (regardless of datapath) because Open vSwitch internally serializes + * potentially conflicting changes. + * + * A flow_mod that does not change the flow table will not trigger any + * notification, even an abbreviated one. For example, a "modify" or "delete" + * flow_mod that does not match any flows will not trigger a notification. + * Whether an "add" or "modify" that specifies all the same parameters that a + * flow already has triggers a notification is unspecified and subject to + * change in future versions of Open vSwitch. + * + * OVS will always send the notifications for a given flow table change before + * the reply to a OFPT_BARRIER_REQUEST request that precedes the flow table + * change. Thus, if the controller does not receive an abbreviated + * notification for a flow_mod before the next OFPT_BARRIER_REPLY, it will + * never receive one. */ +struct nx_flow_update_abbrev { + ovs_be16 length; /* Length is 8. */ + ovs_be16 event; /* NXFME_ABBREV. */ + ovs_be32 xid; /* Controller-specified xid from flow_mod. */ +}; +OFP_ASSERT(sizeof(struct nx_flow_update_abbrev) == 8); + +/* Used by a controller to cancel an outstanding monitor. */ +struct nx_flow_monitor_cancel { + struct nicira_header nxh; /* Type NXT_FLOW_MONITOR_CANCEL. */ + ovs_be32 id; /* 'id' from nx_flow_monitor_request. */ +}; +OFP_ASSERT(sizeof(struct nx_flow_monitor_cancel) == 20); #endif /* openflow/nicira-ext.h */ diff --git a/lib/learning-switch.c b/lib/learning-switch.c index cb0e49bc..b41bea0d 100644 --- a/lib/learning-switch.c +++ b/lib/learning-switch.c @@ -295,12 +295,17 @@ lswitch_process_packet(struct lswitch *sw, struct rconn *rconn, case OFPUTIL_NXT_FLOW_MOD: case OFPUTIL_NXT_FLOW_REMOVED: case OFPUTIL_NXT_FLOW_AGE: + case OFPUTIL_NXT_FLOW_MONITOR_CANCEL: + case OFPUTIL_NXT_FLOW_MONITOR_PAUSED: + case OFPUTIL_NXT_FLOW_MONITOR_RESUMED: case OFPUTIL_NXT_SET_ASYNC_CONFIG: case OFPUTIL_NXT_SET_CONTROLLER_ID: case OFPUTIL_NXST_FLOW_REQUEST: case OFPUTIL_NXST_AGGREGATE_REQUEST: + case OFPUTIL_NXST_FLOW_MONITOR_REQUEST: case OFPUTIL_NXST_FLOW_REPLY: case OFPUTIL_NXST_AGGREGATE_REPLY: + case OFPUTIL_NXST_FLOW_MONITOR_REPLY: default: if (VLOG_IS_DBG_ENABLED()) { char *s = ofp_to_string(msg->data, msg->size, 2); diff --git a/lib/ofp-errors.h b/lib/ofp-errors.h index 61cef418..dddf8d04 100644 --- a/lib/ofp-errors.h +++ b/lib/ofp-errors.h @@ -129,6 +129,20 @@ enum ofperr { * valid. */ OFPERR_NXBRC_BAD_REASON, + /* NX1.0+(1,517). The 'id' in an NXST_FLOW_MONITOR request is the same as + * an existing monitor id (or two monitors in the same NXST_FLOW_MONITOR + * request have the same 'id'). */ + OFPERR_NXBRC_FM_DUPLICATE_ID, + + /* NX1.0+(1,518). The 'flags' in an NXST_FLOW_MONITOR request either does + * not specify at least one of the NXFMF_ADD, NXFMF_DELETE, or NXFMF_MODIFY + * flags, or specifies a flag bit that is not defined. */ + OFPERR_NXBRC_FM_BAD_FLAGS, + + /* NX1.0+(1,519). The 'id' in an NXT_FLOW_MONITOR_CANCEL request is not + * the id of any existing monitor. */ + OFPERR_NXBRC_FM_BAD_ID, + /* ## ---------------- ## */ /* ## OFPET_BAD_ACTION ## */ /* ## ---------------- ## */ @@ -469,7 +483,6 @@ enum ofperr { /* NX1.0(1,513), NX1.1(1,513), OF1.2+(11,2). Invalid role. */ OFPERR_OFPRRFC_BAD_ROLE, - /* ## ------------------ ## */ /* ## OFPET_EXPERIMENTER ## */ /* ## ------------------ ## */ diff --git a/lib/ofp-parse.c b/lib/ofp-parse.c index 922e2968..32d38368 100644 --- a/lib/ofp-parse.c +++ b/lib/ofp-parse.c @@ -700,6 +700,68 @@ parse_ofp_str(struct ofputil_flow_mod *fm, int command, const char *str_, free(string); } +/* Convert 'str_' (as described in the documentation for the "monitor" command + * in the ovs-ofctl man page) into 'fmr'. */ +void +parse_flow_monitor_request(struct ofputil_flow_monitor_request *fmr, + const char *str_) +{ + static uint32_t id; + + char *string = xstrdup(str_); + char *save_ptr = NULL; + char *name; + + fmr->id = id++; + fmr->flags = (NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY + | NXFMF_OWN | NXFMF_ACTIONS); + fmr->out_port = OFPP_NONE; + fmr->table_id = 0xff; + cls_rule_init_catchall(&fmr->match, 0); + + for (name = strtok_r(string, "=, \t\r\n", &save_ptr); name; + name = strtok_r(NULL, "=, \t\r\n", &save_ptr)) { + const struct protocol *p; + + if (!strcmp(name, "!initial")) { + fmr->flags &= ~NXFMF_INITIAL; + } else if (!strcmp(name, "!add")) { + fmr->flags &= ~NXFMF_ADD; + } else if (!strcmp(name, "!delete")) { + fmr->flags &= ~NXFMF_DELETE; + } else if (!strcmp(name, "!modify")) { + fmr->flags &= ~NXFMF_MODIFY; + } else if (!strcmp(name, "!actions")) { + fmr->flags &= ~NXFMF_ACTIONS; + } else if (!strcmp(name, "!own")) { + fmr->flags &= ~NXFMF_OWN; + } else if (parse_protocol(name, &p)) { + cls_rule_set_dl_type(&fmr->match, htons(p->dl_type)); + if (p->nw_proto) { + cls_rule_set_nw_proto(&fmr->match, p->nw_proto); + } + } else { + char *value; + + value = strtok_r(NULL, ", \t\r\n", &save_ptr); + if (!value) { + ovs_fatal(0, "%s: field %s missing value", str_, name); + } + + if (!strcmp(name, "table")) { + fmr->table_id = str_to_table_id(value); + } else if (!strcmp(name, "out_port")) { + fmr->out_port = atoi(value); + } else if (mf_from_name(name)) { + parse_field(mf_from_name(name), value, &fmr->match); + } else { + ovs_fatal(0, "%s: unknown keyword %s", str_, name); + } + } + } + free(string); +} + /* Parses 's' as a set of OpenFlow actions and appends the actions to * 'actions'. * diff --git a/lib/ofp-parse.h b/lib/ofp-parse.h index e9303885..d2d3c3cf 100644 --- a/lib/ofp-parse.h +++ b/lib/ofp-parse.h @@ -26,6 +26,7 @@ struct flow; struct ofpbuf; struct ofputil_flow_mod; +struct ofputil_flow_monitor_request; struct ofputil_flow_stats_request; void parse_ofp_str(struct ofputil_flow_mod *, int command, const char *str_, @@ -44,4 +45,7 @@ void parse_ofpacts(const char *, struct ofpbuf *ofpacts); char *parse_ofp_exact_flow(struct flow *, const char *); +void parse_flow_monitor_request(struct ofputil_flow_monitor_request *, + const char *); + #endif /* ofp-parse.h */ diff --git a/lib/ofp-print.c b/lib/ofp-print.c index 03de5f1e..48b8daa9 100644 --- a/lib/ofp-print.c +++ b/lib/ofp-print.c @@ -1368,6 +1368,137 @@ ofp_print_nxt_set_controller_id(struct ds *string, ds_put_format(string, " id=%"PRIu16, ntohs(nci->controller_id)); } +static void +ofp_print_nxt_flow_monitor_cancel(struct ds *string, + const struct ofp_header *oh) +{ + ds_put_format(string, " id=%"PRIu32, + ofputil_decode_flow_monitor_cancel(oh)); +} + +static const char * +nx_flow_monitor_flags_to_name(uint32_t bit) +{ + enum nx_flow_monitor_flags fmf = bit; + + switch (fmf) { + case NXFMF_INITIAL: return "initial"; + case NXFMF_ADD: return "add"; + case NXFMF_DELETE: return "delete"; + case NXFMF_MODIFY: return "modify"; + case NXFMF_ACTIONS: return "actions"; + case NXFMF_OWN: return "own"; + } + + return NULL; +} + +static void +ofp_print_nxst_flow_monitor_request(struct ds *string, + const struct ofp_header *oh) +{ + struct ofpbuf b; + + ofpbuf_use_const(&b, oh, ntohs(oh->length)); + for (;;) { + struct ofputil_flow_monitor_request request; + int retval; + + retval = ofputil_decode_flow_monitor_request(&request, &b); + if (retval) { + if (retval != EOF) { + ofp_print_error(string, retval); + } + return; + } + + ds_put_format(string, "\n id=%"PRIu32" flags=", request.id); + ofp_print_bit_names(string, request.flags, + nx_flow_monitor_flags_to_name, ','); + + if (request.out_port != OFPP_NONE) { + ds_put_cstr(string, " out_port="); + ofputil_format_port(request.out_port, string); + } + + if (request.table_id != 0xff) { + ds_put_format(string, " table=%"PRIu8, request.table_id); + } + + ds_put_char(string, ' '); + cls_rule_format(&request.match, string); + ds_chomp(string, ' '); + } +} + +static void +ofp_print_nxst_flow_monitor_reply(struct ds *string, + const struct ofp_header *oh) +{ + uint64_t ofpacts_stub[1024 / 8]; + struct ofpbuf ofpacts; + struct ofpbuf b; + + ofpbuf_use_const(&b, oh, ntohs(oh->length)); + ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub); + for (;;) { + struct ofputil_flow_update update; + struct cls_rule match; + int retval; + + update.match = &match; + retval = ofputil_decode_flow_update(&update, &b, &ofpacts); + if (retval) { + if (retval != EOF) { + ofp_print_error(string, retval); + } + ofpbuf_uninit(&ofpacts); + return; + } + + ds_put_cstr(string, "\n event="); + switch (update.event) { + case NXFME_ADDED: + ds_put_cstr(string, "ADDED"); + break; + + case NXFME_DELETED: + ds_put_format(string, "DELETED reason=%s", + ofp_flow_removed_reason_to_string(update.reason)); + break; + + case NXFME_MODIFIED: + ds_put_cstr(string, "MODIFIED"); + break; + + case NXFME_ABBREV: + ds_put_format(string, "ABBREV xid=0x%"PRIx32, ntohl(update.xid)); + continue; + } + + ds_put_format(string, " table=%"PRIu8, update.table_id); + if (update.idle_timeout != OFP_FLOW_PERMANENT) { + ds_put_format(string, " idle_timeout=%"PRIu16, + update.idle_timeout); + } + if (update.hard_timeout != OFP_FLOW_PERMANENT) { + ds_put_format(string, " hard_timeout=%"PRIu16, + update.hard_timeout); + } + ds_put_format(string, " cookie=%#"PRIx64, ntohll(update.cookie)); + + ds_put_char(string, ' '); + cls_rule_format(update.match, string); + + if (update.ofpacts_len) { + if (string->string[string->length - 1] != ' ') { + ds_put_char(string, ' '); + } + ofpacts_format(update.ofpacts, update.ofpacts_len, string); + } + } +} + void ofp_print_version(const struct ofp_header *oh, struct ds *string) @@ -1564,6 +1695,22 @@ ofp_to_string__(const struct ofp_header *oh, ofp_print_stats_reply(string, oh); ofp_print_nxst_aggregate_reply(string, msg); break; + + case OFPUTIL_NXT_FLOW_MONITOR_CANCEL: + ofp_print_nxt_flow_monitor_cancel(string, msg); + break; + + case OFPUTIL_NXT_FLOW_MONITOR_PAUSED: + case OFPUTIL_NXT_FLOW_MONITOR_RESUMED: + break; + + case OFPUTIL_NXST_FLOW_MONITOR_REQUEST: + ofp_print_nxst_flow_monitor_request(string, msg); + break; + + case OFPUTIL_NXST_FLOW_MONITOR_REPLY: + ofp_print_nxst_flow_monitor_reply(string, msg); + break; } } diff --git a/lib/ofp-util.c b/lib/ofp-util.c index cd261439..5fb8d8f2 100644 --- a/lib/ofp-util.c +++ b/lib/ofp-util.c @@ -678,6 +678,18 @@ ofputil_decode_vendor(const struct ofp_header *oh, size_t length, { OFPUTIL_NXT_SET_CONTROLLER_ID, OFP10_VERSION, NXT_SET_CONTROLLER_ID, "NXT_SET_CONTROLLER_ID", sizeof(struct nx_controller_id), 0 }, + + { OFPUTIL_NXT_FLOW_MONITOR_CANCEL, OFP10_VERSION, + NXT_FLOW_MONITOR_CANCEL, "NXT_FLOW_MONITOR_CANCEL", + sizeof(struct nx_flow_monitor_cancel), 0 }, + + { OFPUTIL_NXT_FLOW_MONITOR_PAUSED, OFP10_VERSION, + NXT_FLOW_MONITOR_PAUSED, "NXT_FLOW_MONITOR_PAUSED", + sizeof(struct nicira_header), 0 }, + + { OFPUTIL_NXT_FLOW_MONITOR_RESUMED, OFP10_VERSION, + NXT_FLOW_MONITOR_RESUMED, "NXT_FLOW_MONITOR_RESUMED", + sizeof(struct nicira_header), 0 }, }; static const struct ofputil_msg_category nxt_category = { @@ -760,6 +772,10 @@ ofputil_decode_nxst_request(const struct ofp_header *oh, size_t length, { OFPUTIL_NXST_AGGREGATE_REQUEST, OFP10_VERSION, NXST_AGGREGATE, "NXST_AGGREGATE request", sizeof(struct nx_aggregate_stats_request), 8 }, + + { OFPUTIL_NXST_FLOW_MONITOR_REQUEST, OFP10_VERSION, + NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR request", + sizeof(struct nicira_stats_msg), 8 }, }; static const struct ofputil_msg_category nxst_request_category = { @@ -793,6 +809,10 @@ ofputil_decode_nxst_reply(const struct ofp_header *oh, size_t length, { OFPUTIL_NXST_AGGREGATE_REPLY, OFP10_VERSION, NXST_AGGREGATE, "NXST_AGGREGATE reply", sizeof(struct nx_aggregate_stats_reply), 0 }, + + { OFPUTIL_NXST_FLOW_MONITOR_REPLY, OFP10_VERSION, + NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR reply", + sizeof(struct nicira_stats_msg), 8 }, }; static const struct ofputil_msg_category nxst_reply_category = { @@ -3095,7 +3115,268 @@ ofputil_encode_port_mod(const struct ofputil_port_mod *pm, return b; } + +/* ofputil_flow_monitor_request */ + +/* Converts an NXST_FLOW_MONITOR request in 'msg' into an abstract + * ofputil_flow_monitor_request in 'rq'. + * + * Multiple NXST_FLOW_MONITOR requests can be packed into a single OpenFlow + * message. Calling this function multiple times for a single 'msg' iterates + * through the requests. The caller must initially leave 'msg''s layer + * pointers null and not modify them between calls. + * + * Returns 0 if successful, EOF if no requests were left in this 'msg', + * otherwise an OFPERR_* value. */ +int +ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *rq, + struct ofpbuf *msg) +{ + struct nx_flow_monitor_request *nfmr; + uint16_t flags; + + if (!msg->l2) { + msg->l2 = msg->data; + ofpbuf_pull(msg, sizeof(struct nicira_stats_msg)); + } + + if (!msg->size) { + return EOF; + } + + nfmr = ofpbuf_try_pull(msg, sizeof *nfmr); + if (!nfmr) { + VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR request has %zu " + "leftover bytes at end", msg->size); + return OFPERR_OFPBRC_BAD_LEN; + } + + flags = ntohs(nfmr->flags); + if (!(flags & (NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY)) + || flags & ~(NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE + | NXFMF_MODIFY | NXFMF_ACTIONS | NXFMF_OWN)) { + VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR has bad flags %#"PRIx16, + flags); + return OFPERR_NXBRC_FM_BAD_FLAGS; + } + + if (!is_all_zeros(nfmr->zeros, sizeof nfmr->zeros)) { + return OFPERR_NXBRC_MUST_BE_ZERO; + } + + rq->id = ntohl(nfmr->id); + rq->flags = flags; + rq->out_port = ntohs(nfmr->out_port); + rq->table_id = nfmr->table_id; + + return nx_pull_match(msg, ntohs(nfmr->match_len), OFP_DEFAULT_PRIORITY, + &rq->match, NULL, NULL); +} + +void +ofputil_append_flow_monitor_request( + const struct ofputil_flow_monitor_request *rq, struct ofpbuf *msg) +{ + struct nx_flow_monitor_request *nfmr; + size_t start_ofs; + int match_len; + + if (!msg->size) { + ofputil_put_stats_header(alloc_xid(), OFPT10_STATS_REQUEST, + htons(OFPST_VENDOR), + htonl(NXST_FLOW_MONITOR), msg); + } + + start_ofs = msg->size; + ofpbuf_put_zeros(msg, sizeof *nfmr); + match_len = nx_put_match(msg, false, &rq->match, htonll(0), htonll(0)); + + nfmr = ofpbuf_at_assert(msg, start_ofs, sizeof *nfmr); + nfmr->id = htonl(rq->id); + nfmr->flags = htons(rq->flags); + nfmr->out_port = htons(rq->out_port); + nfmr->match_len = htons(match_len); + nfmr->table_id = rq->table_id; +} + +/* Converts an NXST_FLOW_MONITOR reply (also known as a flow update) in 'msg' + * into an abstract ofputil_flow_update in 'update'. The caller must have + * initialized update->match to point to space allocated for a cls_rule. + * + * Uses 'ofpacts' to store the abstract OFPACT_* version of the update's + * actions (except for NXFME_ABBREV, which never includes actions). The caller + * must initialize 'ofpacts' and retains ownership of it. 'update->ofpacts' + * will point into the 'ofpacts' buffer. + * + * Multiple flow updates can be packed into a single OpenFlow message. Calling + * this function multiple times for a single 'msg' iterates through the + * updates. The caller must initially leave 'msg''s layer pointers null and + * not modify them between calls. + * + * Returns 0 if successful, EOF if no updates were left in this 'msg', + * otherwise an OFPERR_* value. */ +int +ofputil_decode_flow_update(struct ofputil_flow_update *update, + struct ofpbuf *msg, struct ofpbuf *ofpacts) +{ + struct nx_flow_update_header *nfuh; + unsigned int length; + + if (!msg->l2) { + msg->l2 = msg->data; + ofpbuf_pull(msg, sizeof(struct nicira_stats_msg)); + } + + if (!msg->size) { + return EOF; + } + + if (msg->size < sizeof(struct nx_flow_update_header)) { + goto bad_len; + } + + nfuh = msg->data; + update->event = ntohs(nfuh->event); + length = ntohs(nfuh->length); + if (length > msg->size || length % 8) { + goto bad_len; + } + if (update->event == NXFME_ABBREV) { + struct nx_flow_update_abbrev *nfua; + + if (length != sizeof *nfua) { + goto bad_len; + } + + nfua = ofpbuf_pull(msg, sizeof *nfua); + update->xid = nfua->xid; + return 0; + } else if (update->event == NXFME_ADDED + || update->event == NXFME_DELETED + || update->event == NXFME_MODIFIED) { + struct nx_flow_update_full *nfuf; + unsigned int actions_len; + unsigned int match_len; + enum ofperr error; + + if (length < sizeof *nfuf) { + goto bad_len; + } + + nfuf = ofpbuf_pull(msg, sizeof *nfuf); + match_len = ntohs(nfuf->match_len); + if (sizeof *nfuf + match_len > length) { + goto bad_len; + } + + update->reason = ntohs(nfuf->reason); + update->idle_timeout = ntohs(nfuf->idle_timeout); + update->hard_timeout = ntohs(nfuf->hard_timeout); + update->table_id = nfuf->table_id; + update->cookie = nfuf->cookie; + + error = nx_pull_match(msg, match_len, ntohs(nfuf->priority), + update->match, NULL, NULL); + if (error) { + return error; + } + + actions_len = length - sizeof *nfuf - ROUND_UP(match_len, 8); + error = ofpacts_pull_openflow10(msg, actions_len, ofpacts); + if (error) { + return error; + } + + update->ofpacts = ofpacts->data; + update->ofpacts_len = ofpacts->size; + return 0; + } else { + VLOG_WARN_RL(&bad_ofmsg_rl, + "NXST_FLOW_MONITOR reply has bad event %"PRIu16, + ntohs(nfuh->event)); + return OFPERR_OFPET_BAD_REQUEST; + } + +bad_len: + VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR reply has %zu " + "leftover bytes at end", msg->size); + return OFPERR_OFPBRC_BAD_LEN; +} + +uint32_t +ofputil_decode_flow_monitor_cancel(const struct ofp_header *oh) +{ + return ntohl(((const struct nx_flow_monitor_cancel *) oh)->id); +} + +struct ofpbuf * +ofputil_encode_flow_monitor_cancel(uint32_t id) +{ + struct nx_flow_monitor_cancel *nfmc; + struct ofpbuf *msg; + + nfmc = make_nxmsg(sizeof *nfmc, NXT_FLOW_MONITOR_CANCEL, &msg); + nfmc->id = htonl(id); + return msg; +} + +void +ofputil_start_flow_update(struct list *replies) +{ + struct ofpbuf *msg; + + msg = ofpbuf_new(1024); + ofputil_put_stats_header(htonl(0), OFPT10_STATS_REPLY, + htons(OFPST_VENDOR), + htonl(NXST_FLOW_MONITOR), msg); + + list_init(replies); + list_push_back(replies, &msg->list_node); +} + +void +ofputil_append_flow_update(const struct ofputil_flow_update *update, + struct list *replies) +{ + struct nx_flow_update_header *nfuh; + struct ofpbuf *msg; + size_t start_ofs; + + msg = ofpbuf_from_list(list_back(replies)); + start_ofs = msg->size; + + if (update->event == NXFME_ABBREV) { + struct nx_flow_update_abbrev *nfua; + + nfua = ofpbuf_put_zeros(msg, sizeof *nfua); + nfua->xid = update->xid; + } else { + struct nx_flow_update_full *nfuf; + int match_len; + + ofpbuf_put_zeros(msg, sizeof *nfuf); + match_len = nx_put_match(msg, false, update->match, + htonll(0), htonll(0)); + ofpacts_put_openflow10(update->ofpacts, update->ofpacts_len, msg); + + nfuf = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuf); + nfuf->reason = htons(update->reason); + nfuf->priority = htons(update->match->priority); + nfuf->idle_timeout = htons(update->idle_timeout); + nfuf->hard_timeout = htons(update->hard_timeout); + nfuf->match_len = htons(match_len); + nfuf->table_id = update->table_id; + nfuf->cookie = update->cookie; + } + + nfuh = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuh); + nfuh->length = htons(msg->size - start_ofs); + nfuh->event = htons(update->event); + + ofputil_postappend_stats_reply(start_ofs, replies); +} + struct ofpbuf * ofputil_encode_packet_out(const struct ofputil_packet_out *po) { diff --git a/lib/ofp-util.h b/lib/ofp-util.h index 5b1e8edb..f7d3307f 100644 --- a/lib/ofp-util.h +++ b/lib/ofp-util.h @@ -86,14 +86,19 @@ enum ofputil_msg_code { OFPUTIL_NXT_FLOW_AGE, OFPUTIL_NXT_SET_ASYNC_CONFIG, OFPUTIL_NXT_SET_CONTROLLER_ID, + OFPUTIL_NXT_FLOW_MONITOR_CANCEL, + OFPUTIL_NXT_FLOW_MONITOR_PAUSED, + OFPUTIL_NXT_FLOW_MONITOR_RESUMED, /* NXST_* stat requests. */ OFPUTIL_NXST_FLOW_REQUEST, OFPUTIL_NXST_AGGREGATE_REQUEST, + OFPUTIL_NXST_FLOW_MONITOR_REQUEST, /* NXST_* stat replies. */ OFPUTIL_NXST_FLOW_REPLY, - OFPUTIL_NXST_AGGREGATE_REPLY + OFPUTIL_NXST_AGGREGATE_REPLY, + OFPUTIL_NXST_FLOW_MONITOR_REPLY, }; struct ofputil_msg_type; @@ -506,6 +511,48 @@ enum ofperr ofputil_decode_port_mod(const struct ofp_header *, struct ofpbuf *ofputil_encode_port_mod(const struct ofputil_port_mod *, enum ofputil_protocol); +/* Abstract nx_flow_monitor_request. */ +struct ofputil_flow_monitor_request { + uint32_t id; + enum nx_flow_monitor_flags flags; + uint16_t out_port; + uint8_t table_id; + struct cls_rule match; +}; + +int ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *, + struct ofpbuf *msg); +void ofputil_append_flow_monitor_request( + const struct ofputil_flow_monitor_request *, struct ofpbuf *msg); + +/* Abstract nx_flow_update. */ +struct ofputil_flow_update { + enum nx_flow_update_event event; + + /* Used only for NXFME_ADDED, NXFME_DELETED, NXFME_MODIFIED. */ + enum ofp_flow_removed_reason reason; + uint16_t idle_timeout; + uint16_t hard_timeout; + uint8_t table_id; + ovs_be64 cookie; + struct cls_rule *match; + struct ofpact *ofpacts; + size_t ofpacts_len; + + /* Used only for NXFME_ABBREV. */ + ovs_be32 xid; +}; + +int ofputil_decode_flow_update(struct ofputil_flow_update *, + struct ofpbuf *msg, struct ofpbuf *ofpacts); +void ofputil_start_flow_update(struct list *replies); +void ofputil_append_flow_update(const struct ofputil_flow_update *, + struct list *replies); + +/* Abstract nx_flow_monitor_cancel. */ +uint32_t ofputil_decode_flow_monitor_cancel(const struct ofp_header *); +struct ofpbuf *ofputil_encode_flow_monitor_cancel(uint32_t id); + /* OpenFlow protocol utility functions. */ void *make_openflow(size_t openflow_len, uint8_t type, struct ofpbuf **); void *make_nxmsg(size_t openflow_len, uint32_t subtype, struct ofpbuf **); diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c index 3e750d24..b70b0708 100644 --- a/ofproto/connmgr.c +++ b/ofproto/connmgr.c @@ -88,6 +88,13 @@ struct ofconn { * that the message might be generated, a 0-bit disables it. */ uint32_t master_async_config[OAM_N_TYPES]; /* master, other */ uint32_t slave_async_config[OAM_N_TYPES]; /* slave */ + + /* Flow monitors. */ + struct hmap monitors; /* Contains "struct ofmonitor"s. */ + struct list updates; /* List of "struct ofpbuf"s. */ + bool sent_abbrev_update; /* Does 'updates' contain NXFME_ABBREV? */ + struct rconn_packet_counter *monitor_counter; + uint64_t monitor_paused; }; static struct ofconn *ofconn_create(struct connmgr *, struct rconn *, @@ -162,6 +169,8 @@ struct connmgr { static void update_in_band_remotes(struct connmgr *); static void add_snooper(struct connmgr *, struct vconn *); +static void ofmonitor_run(struct connmgr *); +static void ofmonitor_wait(struct connmgr *); /* Creates and returns a new connection manager owned by 'ofproto'. 'name' is * a name for the ofproto suitable for using in log messages. @@ -267,6 +276,7 @@ connmgr_run(struct connmgr *mgr, LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) { ofconn_run(ofconn, handle_openflow); } + ofmonitor_run(mgr); /* Fail-open maintenance. Do this after processing the ofconns since * fail-open checks the status of the controller rconn. */ @@ -326,6 +336,7 @@ connmgr_wait(struct connmgr *mgr, bool handling_openflow) LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { ofconn_wait(ofconn, handling_openflow); } + ofmonitor_wait(mgr); if (handling_openflow && mgr->in_band) { in_band_wait(mgr->in_band); } @@ -1002,6 +1013,9 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type, list_init(&ofconn->opgroups); + hmap_init(&ofconn->monitors); + list_init(&ofconn->updates); + ofconn_flush(ofconn); return ofconn; @@ -1012,6 +1026,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type, static void ofconn_flush(struct ofconn *ofconn) { + struct ofmonitor *monitor, *next_monitor; int i; ofconn->role = NX_ROLE_OTHER; @@ -1080,6 +1095,14 @@ ofconn_flush(struct ofconn *ofconn) memset(ofconn->slave_async_config, 0, sizeof ofconn->slave_async_config); } + + HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node, + &ofconn->monitors) { + ofmonitor_destroy(monitor); + } + rconn_packet_counter_destroy(ofconn->monitor_counter); + ofconn->monitor_counter = rconn_packet_counter_create(); + ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */ } static void @@ -1096,6 +1119,7 @@ ofconn_destroy(struct ofconn *ofconn) rconn_packet_counter_destroy(ofconn->packet_in_counter); rconn_packet_counter_destroy(ofconn->reply_counter); pktbuf_destroy(ofconn->pktbuf); + rconn_packet_counter_destroy(ofconn->monitor_counter); free(ofconn); } @@ -1646,3 +1670,239 @@ ofservice_lookup(struct connmgr *mgr, const char *target) } return NULL; } + +/* Flow monitors (NXST_FLOW_MONITOR). */ + +/* A counter incremented when something significant happens to an OpenFlow + * rule. + * + * - When a rule is added, its 'add_seqno' and 'modify_seqno' are set to + * the current value (which is then incremented). + * + * - When a rule is modified, its 'modify_seqno' is set to the current + * value (which is then incremented). + * + * Thus, by comparing an old value of monitor_seqno against a rule's + * 'add_seqno', one can tell whether the rule was added before or after the old + * value was read, and similarly for 'modify_seqno'. + * + * 32 bits should normally be sufficient (and would be nice, to save space in + * each rule) but then we'd have to have some special cases for wraparound. + * + * We initialize monitor_seqno to 1 to allow 0 to be used as an invalid + * value. */ +static uint64_t monitor_seqno = 1; + +COVERAGE_DEFINE(ofmonitor_pause); +COVERAGE_DEFINE(ofmonitor_resume); + +enum ofperr +ofmonitor_create(const struct ofputil_flow_monitor_request *request, + struct ofconn *ofconn, struct ofmonitor **monitorp) +{ + struct ofmonitor *m; + + *monitorp = NULL; + + m = ofmonitor_lookup(ofconn, request->id); + if (m) { + return OFPERR_NXBRC_FM_DUPLICATE_ID; + } + + m = xmalloc(sizeof *m); + m->ofconn = ofconn; + hmap_insert(&ofconn->monitors, &m->ofconn_node, hash_int(request->id, 0)); + m->id = request->id; + m->flags = request->flags; + m->out_port = request->out_port; + m->table_id = request->table_id; + m->match = request->match; + + *monitorp = m; + return 0; +} + +struct ofmonitor * +ofmonitor_lookup(struct ofconn *ofconn, uint32_t id) +{ + struct ofmonitor *m; + + HMAP_FOR_EACH_IN_BUCKET (m, ofconn_node, hash_int(id, 0), + &ofconn->monitors) { + if (m->id == id) { + return m; + } + } + return NULL; +} + +void +ofmonitor_destroy(struct ofmonitor *m) +{ + if (m) { + hmap_remove(&m->ofconn->monitors, &m->ofconn_node); + free(m); + } +} + +void +ofmonitor_report(struct connmgr *mgr, struct rule *rule, + enum nx_flow_update_event event, + enum ofp_flow_removed_reason reason, + const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid) +{ + enum nx_flow_monitor_flags update; + struct ofconn *ofconn; + + switch (event) { + case NXFME_ADDED: + update = NXFMF_ADD; + rule->add_seqno = rule->modify_seqno = monitor_seqno++; + break; + + case NXFME_DELETED: + update = NXFMF_DELETE; + break; + + case NXFME_MODIFIED: + update = NXFMF_MODIFY; + rule->modify_seqno = monitor_seqno++; + break; + + default: + case NXFME_ABBREV: + NOT_REACHED(); + } + + LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { + enum nx_flow_monitor_flags flags = 0; + struct ofmonitor *m; + + if (ofconn->monitor_paused) { + /* Only send NXFME_DELETED notifications for flows that were added + * before we paused. */ + if (event != NXFME_DELETED + || rule->add_seqno > ofconn->monitor_paused) { + continue; + } + } + + HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) { + if (m->flags & update + && (m->table_id == 0xff || m->table_id == rule->table_id) + && ofoperation_has_out_port(rule->pending, m->out_port) + && cls_rule_is_loose_match(&rule->cr, &m->match)) { + flags |= m->flags; + } + } + + if (flags) { + if (list_is_empty(&ofconn->updates)) { + ofputil_start_flow_update(&ofconn->updates); + ofconn->sent_abbrev_update = false; + } + + if (ofconn != abbrev_ofconn || ofconn->monitor_paused) { + struct ofputil_flow_update fu; + + fu.event = event; + fu.reason = event == NXFME_DELETED ? reason : 0; + fu.idle_timeout = rule->idle_timeout; + fu.hard_timeout = rule->hard_timeout; + fu.table_id = rule->table_id; + fu.cookie = rule->flow_cookie; + fu.match = &rule->cr; + if (flags & NXFMF_ACTIONS) { + fu.ofpacts = rule->ofpacts; + fu.ofpacts_len = rule->ofpacts_len; + } else { + fu.ofpacts = NULL; + fu.ofpacts_len = 0; + } + ofputil_append_flow_update(&fu, &ofconn->updates); + } else if (!ofconn->sent_abbrev_update) { + struct ofputil_flow_update fu; + + fu.event = NXFME_ABBREV; + fu.xid = abbrev_xid; + ofputil_append_flow_update(&fu, &ofconn->updates); + + ofconn->sent_abbrev_update = true; + } + } + } +} + +void +ofmonitor_flush(struct connmgr *mgr) +{ + struct ofconn *ofconn; + + LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { + struct ofpbuf *msg, *next; + + LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) { + list_remove(&msg->list_node); + ofconn_send(ofconn, msg, ofconn->monitor_counter); + if (!ofconn->monitor_paused + && ofconn->monitor_counter->n_bytes > 128 * 1024) { + struct ofpbuf *pause; + + COVERAGE_INC(ofmonitor_pause); + ofconn->monitor_paused = monitor_seqno++; + make_nxmsg_xid(sizeof(struct nicira_header), + NXT_FLOW_MONITOR_PAUSED, htonl(0), &pause); + ofconn_send(ofconn, pause, ofconn->monitor_counter); + } + } + } +} + +static void +ofmonitor_resume(struct ofconn *ofconn) +{ + struct ofpbuf *resume; + struct ofmonitor *m; + struct list rules; + struct list msgs; + + list_init(&rules); + HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) { + ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules); + } + + list_init(&msgs); + ofmonitor_compose_refresh_updates(&rules, &msgs); + + make_nxmsg_xid(sizeof(struct nicira_header), + NXT_FLOW_MONITOR_RESUMED, htonl(0), &resume); + list_push_back(&msgs, &resume->list_node); + ofconn_send_replies(ofconn, &msgs); + + ofconn->monitor_paused = 0; +} + +static void +ofmonitor_run(struct connmgr *mgr) +{ + struct ofconn *ofconn; + + LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { + if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + COVERAGE_INC(ofmonitor_resume); + ofmonitor_resume(ofconn); + } + } +} + +static void +ofmonitor_wait(struct connmgr *mgr) +{ + struct ofconn *ofconn; + + LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { + if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + poll_immediate_wake(); + } + } +} diff --git a/ofproto/connmgr.h b/ofproto/connmgr.h index dec5b71c..24a33fb0 100644 --- a/ofproto/connmgr.h +++ b/ofproto/connmgr.h @@ -17,6 +17,7 @@ #ifndef CONNMGR_H #define CONNMGR_H 1 +#include "classifier.h" #include "hmap.h" #include "list.h" #include "ofp-errors.h" @@ -30,6 +31,7 @@ struct ofopgroup; struct ofputil_flow_removed; struct ofputil_packet_in; struct ofputil_phy_port; +struct rule; struct simap; struct sset; @@ -159,4 +161,34 @@ bool connmgr_may_set_up_flow(struct connmgr *, const struct flow *, /* Fail-open and in-band implementation. */ void connmgr_flushed(struct connmgr *); +/* A flow monitor managed by NXST_FLOW_MONITOR and related requests. */ +struct ofmonitor { + struct ofconn *ofconn; /* Owning 'ofconn'. */ + struct hmap_node ofconn_node; /* In ofconn's 'monitors' hmap. */ + uint32_t id; + + enum nx_flow_monitor_flags flags; + + /* Matching. */ + uint16_t out_port; + uint8_t table_id; + struct cls_rule match; +}; + +struct ofputil_flow_monitor_request; + +enum ofperr ofmonitor_create(const struct ofputil_flow_monitor_request *, + struct ofconn *, struct ofmonitor **); +struct ofmonitor *ofmonitor_lookup(struct ofconn *, uint32_t id); +void ofmonitor_destroy(struct ofmonitor *); + +void ofmonitor_report(struct connmgr *, struct rule *, + enum nx_flow_update_event, enum ofp_flow_removed_reason, + const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid); +void ofmonitor_flush(struct connmgr *); + +void ofmonitor_collect_resume_rules(struct ofmonitor *, uint64_t seqno, + struct list *rules); +void ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs); + #endif /* connmgr.h */ diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h index f22c9f61..6eef1063 100644 --- a/ofproto/ofproto-provider.h +++ b/ofproto/ofproto-provider.h @@ -187,6 +187,11 @@ struct rule { struct ofpact *ofpacts; /* Sequence of "struct ofpacts". */ unsigned int ofpacts_len; /* Size of 'ofpacts', in bytes. */ + + /* Flow monitors. */ + enum nx_flow_monitor_flags monitor_flags; + uint64_t add_seqno; /* Sequence number when added. */ + uint64_t modify_seqno; /* Sequence number when changed. */ }; static inline struct rule * @@ -199,9 +204,15 @@ void ofproto_rule_update_used(struct rule *, long long int used); void ofproto_rule_expire(struct rule *, uint8_t reason); void ofproto_rule_destroy(struct rule *); +bool ofproto_rule_has_out_port(const struct rule *, uint16_t out_port); + void ofoperation_complete(struct ofoperation *, enum ofperr); struct rule *ofoperation_get_victim(struct ofoperation *); +bool ofoperation_has_out_port(const struct ofoperation *, uint16_t out_port); + +bool ofproto_rule_is_hidden(const struct rule *); + /* ofproto class structure, to be defined by each ofproto implementation. * * diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c index 93401919..b187c86f 100644 --- a/ofproto/ofproto.c +++ b/ofproto/ofproto.c @@ -126,13 +126,17 @@ struct ofoperation { struct ofpact *ofpacts; size_t ofpacts_len; + /* OFOPERATION_DELETE. */ + enum ofp_flow_removed_reason reason; /* Reason flow was removed. */ + ovs_be64 flow_cookie; /* Rule's old flow cookie. */ enum ofperr error; /* 0 if no error. */ }; static struct ofoperation *ofoperation_create(struct ofopgroup *, struct rule *, - enum ofoperation_type); + enum ofoperation_type, + enum ofp_flow_removed_reason); static void ofoperation_destroy(struct ofoperation *); /* oftable. */ @@ -188,7 +192,6 @@ static void reinit_ports(struct ofproto *); static void ofproto_rule_destroy__(struct rule *); static void ofproto_rule_send_removed(struct rule *, uint8_t reason); static bool rule_is_modifiable(const struct rule *); -static bool rule_is_hidden(const struct rule *); /* OpenFlow. */ static enum ofperr add_flow(struct ofproto *, struct ofconn *, @@ -952,7 +955,8 @@ ofproto_flush__(struct ofproto *ofproto) cls_cursor_init(&cursor, &table->cls, NULL); CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) { if (!rule->pending) { - ofoperation_create(group, rule, OFOPERATION_DELETE); + ofoperation_create(group, rule, OFOPERATION_DELETE, + OFPRR_DELETE); oftable_remove_rule(rule); ofproto->ofproto_class->rule_destruct(rule); } @@ -1445,7 +1449,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target) } else { /* Initiate deletion -> success. */ struct ofopgroup *group = ofopgroup_create_unattached(ofproto); - ofoperation_create(group, rule, OFOPERATION_DELETE); + ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE); oftable_remove_rule(rule); ofproto->ofproto_class->rule_destruct(rule); ofopgroup_submit(group); @@ -1894,13 +1898,36 @@ ofproto_rule_destroy(struct rule *rule) /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */ -static bool -rule_has_out_port(const struct rule *rule, uint16_t port) +bool +ofproto_rule_has_out_port(const struct rule *rule, uint16_t port) { return (port == OFPP_NONE || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port)); } +/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or + * OFPAT_ENQUEUE action that outputs to 'out_port'. */ +bool +ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port) +{ + if (ofproto_rule_has_out_port(op->rule, out_port)) { + return true; + } + + switch (op->type) { + case OFOPERATION_ADD: + return op->victim && ofproto_rule_has_out_port(op->victim, out_port); + + case OFOPERATION_DELETE: + return false; + + case OFOPERATION_MODIFY: + return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port); + } + + NOT_REACHED(); +} + /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s * statistics appropriately. 'packet' must have at least sizeof(struct * ofp_packet_in) bytes of headroom. @@ -1925,8 +1952,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet) * Rules with priority higher than UINT16_MAX are set up by ofproto itself * (e.g. by in-band control) and are intentionally hidden from the * controller. */ -static bool -rule_is_hidden(const struct rule *rule) +bool +ofproto_rule_is_hidden(const struct rule *rule) { return rule->cr.priority > UINT16_MAX; } @@ -2393,7 +2420,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id, if (rule->pending) { return OFPROTO_POSTPONE; } - if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port) + if (!ofproto_rule_is_hidden(rule) + && ofproto_rule_has_out_port(rule, out_port) && !((rule->flow_cookie ^ cookie) & cookie_mask)) { list_push_back(rules, &rule->ofproto_node); } @@ -2437,7 +2465,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id, if (rule->pending) { return OFPROTO_POSTPONE; } - if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port) + if (!ofproto_rule_is_hidden(rule) + && ofproto_rule_has_out_port(rule, out_port) && !((rule->flow_cookie ^ cookie) & cookie_mask)) { list_push_back(rules, &rule->ofproto_node); } @@ -2855,6 +2884,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn, rule->ofpacts_len = fm->ofpacts_len; rule->evictable = true; rule->eviction_group = NULL; + rule->monitor_flags = 0; + rule->add_seqno = 0; + rule->modify_seqno = 0; /* Insert new rule. */ victim = oftable_replace_rule(rule); @@ -2886,7 +2918,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn, } group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id); - op = ofoperation_create(group, rule, OFOPERATION_ADD); + op = ofoperation_create(group, rule, OFOPERATION_ADD, 0); op->victim = victim; error = ofproto->ofproto_class->rule_construct(rule); @@ -2950,7 +2982,7 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn, continue; } - op = ofoperation_create(group, rule, OFOPERATION_MODIFY); + op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0); rule->flow_cookie = new_cookie; if (actions_changed) { op->ofpacts = rule->ofpacts; @@ -3029,7 +3061,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group) ofproto_rule_send_removed(rule, OFPRR_DELETE); - ofoperation_create(group, rule, OFOPERATION_DELETE); + ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE); oftable_remove_rule(rule); ofproto->ofproto_class->rule_destruct(rule); } @@ -3094,7 +3126,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason) { struct ofputil_flow_removed fr; - if (rule_is_hidden(rule) || !rule->send_flow_removed) { + if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) { return; } @@ -3144,7 +3176,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason) ofproto_rule_send_removed(rule, reason); group = ofopgroup_create_unattached(ofproto); - ofoperation_create(group, rule, OFOPERATION_DELETE); + ofoperation_create(group, rule, OFOPERATION_DELETE, reason); oftable_remove_rule(rule); ofproto->ofproto_class->rule_destruct(rule); ofopgroup_submit(group); @@ -3397,6 +3429,255 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh) return 0; } +static void +ofproto_compose_flow_refresh_update(const struct rule *rule, + enum nx_flow_monitor_flags flags, + struct list *msgs) +{ + struct ofoperation *op = rule->pending; + struct ofputil_flow_update fu; + + if (op && op->type == OFOPERATION_ADD && !op->victim) { + /* We'll report the final flow when the operation completes. Reporting + * it now would cause a duplicate report later. */ + return; + } + + fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD) + ? NXFME_ADDED : NXFME_MODIFIED); + fu.reason = 0; + fu.idle_timeout = rule->idle_timeout; + fu.hard_timeout = rule->hard_timeout; + fu.table_id = rule->table_id; + fu.cookie = rule->flow_cookie; + fu.match = (struct cls_rule *) &rule->cr; + if (!(flags & NXFMF_ACTIONS)) { + fu.ofpacts = NULL; + fu.ofpacts_len = 0; + } else if (!op) { + fu.ofpacts = rule->ofpacts; + fu.ofpacts_len = rule->ofpacts_len; + } else { + /* An operation is in progress. Use the previous version of the flow's + * actions, so that when the operation commits we report the change. */ + switch (op->type) { + case OFOPERATION_ADD: + /* We already verified that there was a victim. */ + fu.ofpacts = op->victim->ofpacts; + fu.ofpacts_len = op->victim->ofpacts_len; + break; + + case OFOPERATION_MODIFY: + if (op->ofpacts) { + fu.ofpacts = op->ofpacts; + fu.ofpacts_len = op->ofpacts_len; + } else { + fu.ofpacts = rule->ofpacts; + fu.ofpacts_len = rule->ofpacts_len; + } + break; + + case OFOPERATION_DELETE: + fu.ofpacts = rule->ofpacts; + fu.ofpacts_len = rule->ofpacts_len; + break; + + default: + NOT_REACHED(); + } + } + + if (list_is_empty(msgs)) { + ofputil_start_flow_update(msgs); + } + ofputil_append_flow_update(&fu, msgs); +} + +void +ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs) +{ + struct rule *rule; + + LIST_FOR_EACH (rule, ofproto_node, rules) { + enum nx_flow_monitor_flags flags = rule->monitor_flags; + rule->monitor_flags = 0; + + ofproto_compose_flow_refresh_update(rule, flags, msgs); + } +} + +static void +ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m, + struct rule *rule, uint64_t seqno, + struct list *rules) +{ + enum nx_flow_monitor_flags update; + + if (ofproto_rule_is_hidden(rule)) { + return; + } + + if (!(rule->pending + ? ofoperation_has_out_port(rule->pending, m->out_port) + : ofproto_rule_has_out_port(rule, m->out_port))) { + return; + } + + if (seqno) { + if (rule->add_seqno > seqno) { + update = NXFMF_ADD | NXFMF_MODIFY; + } else if (rule->modify_seqno > seqno) { + update = NXFMF_MODIFY; + } else { + return; + } + + if (!(m->flags & update)) { + return; + } + } else { + update = NXFMF_INITIAL; + } + + if (!rule->monitor_flags) { + list_push_back(rules, &rule->ofproto_node); + } + rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS); +} + +static void +ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m, + uint64_t seqno, + struct list *rules) +{ + const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn); + const struct ofoperation *op; + const struct oftable *table; + + FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) { + struct cls_cursor cursor; + struct rule *rule; + + cls_cursor_init(&cursor, &table->cls, &m->match); + CLS_CURSOR_FOR_EACH (rule, cr, &cursor) { + assert(!rule->pending); /* XXX */ + ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules); + } + } + + HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) { + struct rule *rule = op->rule; + + if (((m->table_id == 0xff + ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN) + : m->table_id == rule->table_id)) + && cls_rule_is_loose_match(&rule->cr, &m->match)) { + ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules); + } + } +} + +static void +ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m, + struct list *rules) +{ + if (m->flags & NXFMF_INITIAL) { + ofproto_collect_ofmonitor_refresh_rules(m, 0, rules); + } +} + +void +ofmonitor_collect_resume_rules(struct ofmonitor *m, + uint64_t seqno, struct list *rules) +{ + ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules); +} + +static enum ofperr +handle_flow_monitor_request(struct ofconn *ofconn, + const struct ofp_stats_msg *osm) +{ + struct ofproto *ofproto = ofconn_get_ofproto(ofconn); + struct ofmonitor **monitors; + size_t n_monitors, allocated_monitors; + struct list replies; + enum ofperr error; + struct list rules; + struct ofpbuf b; + size_t i; + + error = 0; + ofpbuf_use_const(&b, osm, ntohs(osm->header.length)); + monitors = NULL; + n_monitors = allocated_monitors = 0; + for (;;) { + struct ofputil_flow_monitor_request request; + struct ofmonitor *m; + int retval; + + retval = ofputil_decode_flow_monitor_request(&request, &b); + if (retval == EOF) { + break; + } else if (retval) { + error = retval; + goto error; + } + + if (request.table_id != 0xff + && request.table_id >= ofproto->n_tables) { + error = OFPERR_OFPBRC_BAD_TABLE_ID; + goto error; + } + + error = ofmonitor_create(&request, ofconn, &m); + if (error) { + goto error; + } + + if (n_monitors >= allocated_monitors) { + monitors = x2nrealloc(monitors, &allocated_monitors, + sizeof *monitors); + } + monitors[n_monitors++] = m; + } + + list_init(&rules); + for (i = 0; i < n_monitors; i++) { + ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules); + } + + ofputil_start_stats_reply(osm, &replies); + ofmonitor_compose_refresh_updates(&rules, &replies); + ofconn_send_replies(ofconn, &replies); + + free(monitors); + + return 0; + +error: + for (i = 0; i < n_monitors; i++) { + ofmonitor_destroy(monitors[i]); + } + free(monitors); + return error; +} + +static enum ofperr +handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh) +{ + struct ofmonitor *m; + uint32_t id; + + id = ofputil_decode_flow_monitor_cancel(oh); + m = ofmonitor_lookup(ofconn, id); + if (!m) { + return OFPERR_NXBRC_FM_BAD_ID; + } + + ofmonitor_destroy(m); + return 0; +} + static enum ofperr handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg) { @@ -3462,6 +3743,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg) /* Nothing to do. */ return 0; + case OFPUTIL_NXT_FLOW_MONITOR_CANCEL: + return handle_flow_monitor_cancel(ofconn, oh); + case OFPUTIL_NXT_SET_ASYNC_CONFIG: return handle_nxt_set_async_config(ofconn, oh); @@ -3489,6 +3773,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg) case OFPUTIL_OFPST_PORT_DESC_REQUEST: return handle_port_desc_stats_request(ofconn, msg->data); + case OFPUTIL_NXST_FLOW_MONITOR_REQUEST: + return handle_flow_monitor_request(ofconn, msg->data); + case OFPUTIL_MSG_INVALID: case OFPUTIL_OFPT_HELLO: case OFPUTIL_OFPT_ERROR: @@ -3510,8 +3797,11 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg) case OFPUTIL_NXT_ROLE_REPLY: case OFPUTIL_NXT_FLOW_REMOVED: case OFPUTIL_NXT_PACKET_IN: + case OFPUTIL_NXT_FLOW_MONITOR_PAUSED: + case OFPUTIL_NXT_FLOW_MONITOR_RESUMED: case OFPUTIL_NXST_FLOW_REPLY: case OFPUTIL_NXST_AGGREGATE_REPLY: + case OFPUTIL_NXST_FLOW_MONITOR_REPLY: default: return (oh->type == OFPT10_STATS_REQUEST || oh->type == OFPT10_STATS_REPLY @@ -3600,6 +3890,10 @@ static void ofopgroup_complete(struct ofopgroup *group) { struct ofproto *ofproto = group->ofproto; + + struct ofconn *abbrev_ofconn; + ovs_be32 abbrev_xid; + struct ofoperation *op, *next_op; int error; @@ -3630,8 +3924,28 @@ ofopgroup_complete(struct ofopgroup *group) } } + if (!error && !list_is_empty(&group->ofconn_node)) { + abbrev_ofconn = group->ofconn; + abbrev_xid = group->request->xid; + } else { + abbrev_ofconn = NULL; + abbrev_xid = htonl(0); + } LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) { struct rule *rule = op->rule; + + if (!op->error && !ofproto_rule_is_hidden(rule)) { + /* Check that we can just cast from ofoperation_type to + * nx_flow_update_event. */ + BUILD_ASSERT_DECL(OFOPERATION_ADD == NXFME_ADDED); + BUILD_ASSERT_DECL(OFOPERATION_DELETE == NXFME_DELETED); + BUILD_ASSERT_DECL(OFOPERATION_MODIFY == NXFME_MODIFIED); + + ofmonitor_report(ofproto->connmgr, rule, + (enum nx_flow_update_event) op->type, + op->reason, abbrev_ofconn, abbrev_xid); + } + rule->pending = NULL; switch (op->type) { @@ -3685,6 +3999,8 @@ ofopgroup_complete(struct ofopgroup *group) ofoperation_destroy(op); } + ofmonitor_flush(ofproto->connmgr); + if (!list_is_empty(&group->ofproto_node)) { assert(ofproto->n_pending > 0); ofproto->n_pending--; @@ -3704,11 +4020,15 @@ ofopgroup_complete(struct ofopgroup *group) /* Initiates a new operation on 'rule', of the specified 'type', within * 'group'. Prior to calling, 'rule' must not have any pending operation. * + * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that + * the flow is being deleted. For other 'type's, 'reason' is ignored (use 0). + * * Returns the newly created ofoperation (which is also available as * rule->pending). */ static struct ofoperation * ofoperation_create(struct ofopgroup *group, struct rule *rule, - enum ofoperation_type type) + enum ofoperation_type type, + enum ofp_flow_removed_reason reason) { struct ofproto *ofproto = group->ofproto; struct ofoperation *op; @@ -3720,6 +4040,7 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule, list_push_back(&group->ops, &op->group_node); op->rule = rule; op->type = type; + op->reason = reason; op->flow_cookie = rule->flow_cookie; group->n_running++; @@ -3891,7 +4212,8 @@ ofproto_evict(struct ofproto *ofproto) break; } - ofoperation_create(group, rule, OFOPERATION_DELETE); + ofoperation_create(group, rule, + OFOPERATION_DELETE, OFPRR_EVICTION); oftable_remove_rule(rule); ofproto->ofproto_class->rule_destruct(rule); } diff --git a/tests/ofp-print.at b/tests/ofp-print.at index 20148512..1fe54f17 100644 --- a/tests/ofp-print.at +++ b/tests/ofp-print.at @@ -810,6 +810,34 @@ NXT_SET_CONTROLLER_ID (xid=0x3): id=123 ]) AT_CLEANUP +AT_SETUP([NXT_FLOW_MONITOR_CANCEL]) +AT_KEYWORDS([ofp-print]) +AT_CHECK([ovs-ofctl ofp-print "\ +01 04 00 14 00 00 00 03 00 00 23 20 00 00 00 15 \ +01 02 30 40 \ +"], [0], [dnl +NXT_FLOW_MONITOR_CANCEL (xid=0x3): id=16920640 +]) +AT_CLEANUP + +AT_SETUP([NXT_FLOW_MONITOR_PAUSED]) +AT_KEYWORDS([ofp-print]) +AT_CHECK([ovs-ofctl ofp-print "\ +01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 16 \ +"], [0], [dnl +NXT_FLOW_MONITOR_PAUSED (xid=0x3): +]) +AT_CLEANUP + +AT_SETUP([NXT_FLOW_MONITOR_RESUMED]) +AT_KEYWORDS([ofp-print]) +AT_CHECK([ovs-ofctl ofp-print "\ +01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 17 \ +"], [0], [dnl +NXT_FLOW_MONITOR_RESUMED (xid=0x3): +]) +AT_CLEANUP + AT_SETUP([NXT_SET_FLOW_FORMAT]) AT_KEYWORDS([ofp-print]) AT_CHECK([ovs-ofctl ofp-print "\ @@ -1061,3 +1089,30 @@ AT_CHECK([ovs-ofctl ofp-print "\ NXST_AGGREGATE reply (xid=0x4): packet_count=7 byte_count=420 flow_count=7 ]) AT_CLEANUP + +AT_SETUP([NXST_FLOW_MONITOR request]) +AT_KEYWORDS([ofp-print OFPT_STATS_REPLY]) +AT_CHECK([ovs-ofctl ofp-print "\ +01 10 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \ +00 00 40 00 00 3f ff fe 00 00 01 00 00 00 00 00 \ +00 00 20 00 00 04 ff ff 00 06 02 00 00 00 00 00 00 00 00 02 00 01 00 00 \ +"], [0], [dnl +NXST_FLOW_MONITOR request (xid=0x4): + id=16384 flags=initial,add,delete,modify,actions,own out_port=LOCAL table=1 + id=8192 flags=delete table=2 in_port=1 +]) +AT_CLEANUP + +AT_SETUP([NXST_FLOW_MONITOR reply]) +AT_KEYWORDS([ofp-print OFPT_STATS_REPLY]) +AT_CHECK([ovs-ofctl ofp-print "\ +01 11 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \ +00 20 00 01 00 04 80 00 00 05 00 10 00 06 01 00 12 34 56 78 9a bc de f0 \ +00 00 00 02 00 01 00 00 \ +00 08 00 03 00 01 86 a0 \ +"], [0], [dnl +NXST_FLOW_MONITOR reply (xid=0x4): + event=DELETED reason=eviction table=1 idle_timeout=5 hard_timeout=16 cookie=0x123456789abcdef0 in_port=1 + event=ABBREV xid=0x186a0 +]) +AT_CLEANUP diff --git a/tests/ofproto.at b/tests/ofproto.at index d703fa83..804965b6 100644 --- a/tests/ofproto.at +++ b/tests/ofproto.at @@ -736,3 +736,133 @@ OFPT_BARRIER_REPLY: OVS_VSWITCHD_STOP AT_CLEANUP + +AT_SETUP([ofproto - flow monitoring]) +AT_KEYWORDS([monitor]) +OVS_VSWITCHD_START + +ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:1 + +# Start a monitor watching the flow table and check the initial reply. +ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1 +AT_CAPTURE_FILE([monitor.log]) +ovs-appctl -t ovs-ofctl ofctl/barrier +AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0], + [NXST_FLOW_MONITOR reply: + event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:1 +OFPT_BARRIER_REPLY: +]) + +# Add, delete, and modify some flows and check the updates. +ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log +ovs-ofctl add-flow br0 in_port=0,dl_vlan=124,actions=output:2 +ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:5 +ovs-ofctl mod-flows br0 cookie=5,dl_vlan=123,actions=output:3 +ovs-ofctl del-flows br0 dl_vlan=123 +ovs-ofctl del-flows br0 +ovs-appctl -t ovs-ofctl ofctl/barrier +AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0], +[NXST_FLOW_MONITOR reply (xid=0x0): + event=ADDED table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2 +NXST_FLOW_MONITOR reply (xid=0x0): + event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:5 +NXST_FLOW_MONITOR reply (xid=0x0): + event=MODIFIED table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3 +NXST_FLOW_MONITOR reply (xid=0x0): + event=DELETED reason=delete table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3 +NXST_FLOW_MONITOR reply (xid=0x0): + event=DELETED reason=delete table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2 +OFPT_BARRIER_REPLY: +]) + +# Check that our own changes are reported as abbreviations. +ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log +ovs-ofctl add-flow br0 in_port=1,actions=output:2 +ovs-ofctl add-flow br0 in_port=2,actions=output:1 +ovs-appctl -t ovs-ofctl ofctl/send 010e004812345678003fffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003000000000000ffffffffffff0000 +ovs-appctl -t ovs-ofctl ofctl/barrier +AT_CHECK([ovs-ofctl dump-flows br0 | ofctl_strip], [0], [NXST_FLOW reply: +]) +AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0], +[NXST_FLOW_MONITOR reply (xid=0x0): + event=ADDED table=0 cookie=0 in_port=1 actions=output:2 +NXST_FLOW_MONITOR reply (xid=0x0): + event=ADDED table=0 cookie=0 in_port=2 actions=output:1 +send: OFPT_FLOW_MOD: DEL priority=0 actions=drop +NXST_FLOW_MONITOR reply (xid=0x0): + event=ABBREV xid=0x12345678 +OFPT_BARRIER_REPLY: +]) + +ovs-appctl -t ovs-ofctl exit +OVS_VSWITCHD_STOP +AT_CLEANUP + +AT_SETUP([ofproto - flow monitoring pause and resume]) +AT_KEYWORDS([monitor]) + +# With a Linux kernel, this file has the maximum socket receive buffer +# size. That's important for this test, which tests behavior when the +# receive buffer overflows. +AT_SKIP_IF([test ! -e /proc/sys/net/core/rmem_max]) + +# Calculate the total amount of queuing: rmem_max in the kernel, 128 kB +# in ofproto sending userspace (see ofmonitor_flush() in connmgr.c). +rmem_max=`cat /proc/sys/net/core/rmem_max` +queue_size=`expr $rmem_max + 128 \* 1024` +echo rmem_max=$rmem_max queue_size=$queue_size + +# Each flow update message takes up at least 48 bytes of space in queues +# and in practice more than that. +n_msgs=`expr $queue_size / 48` +echo n_msgs=$n_msgs + +OVS_VSWITCHD_START + +# Start a monitor watching the flow table, then make it block. +ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1 +AT_CAPTURE_FILE([monitor.log]) +ovs-appctl -t ovs-ofctl ofctl/block + +# Add $n_msgs flows. +(echo "in_port=2,actions=output:2" +perl -e ' + for ($i = 0; $i < '$n_msgs'; $i++) { + print "cookie=1,reg1=$i,actions=drop\n"; + } +') > flows.txt +AT_CHECK([ovs-ofctl add-flows br0 flows.txt]) +AT_CHECK([ovs-ofctl add-flow br0 in_port=1,cookie=3,actions=drop]) +AT_CHECK([ovs-ofctl mod-flows br0 in_port=2,cookie=2,actions=output:2]) +AT_CHECK([ovs-ofctl del-flows br0 cookie=1/-1]) + +ovs-appctl -t ovs-ofctl ofctl/unblock +ovs-appctl -t ovs-ofctl ofctl/barrier + +ovs-appctl -t ovs-ofctl exit + +# Check that the flow monitor reported the same number of flows +# added and deleted, but fewer than we actually added and deleted. +adds=`grep -c 'ADDED.*reg1=' monitor.log` +deletes=`grep -c 'DELETED.*reg1=' monitor.log` +echo adds=$adds deletes=$deletes +AT_CHECK([test $adds -gt 100 && test $adds -lt $n_msgs]) +AT_CHECK([test $adds = $deletes]) + +# Check that the flow monitor reported everything in the expected order. +AT_CHECK([ofctl_strip < monitor.log | sed -n -e ' +/reg1=0x22\b/p +/cookie=0x[[23]]/p +/NXT_FLOW_MONITOR_PAUSED:/p +/NXT_FLOW_MONITOR_RESUMED:/p +'], [0], +[ event=ADDED table=0 cookie=0x1 reg1=0x22 +NXT_FLOW_MONITOR_PAUSED: + event=DELETED reason=delete table=0 cookie=0x1 reg1=0x22 + event=ADDED table=0 cookie=0x3 in_port=1 + event=MODIFIED table=0 cookie=0x2 in_port=2 actions=output:2 +NXT_FLOW_MONITOR_RESUMED: +]) + +OVS_VSWITCHD_STOP +AT_CLEANUP diff --git a/utilities/ovs-ofctl.8.in b/utilities/ovs-ofctl.8.in index ebfde0fa..65fc6e8e 100644 --- a/utilities/ovs-ofctl.8.in +++ b/utilities/ovs-ofctl.8.in @@ -282,7 +282,7 @@ If a switch has no controller configured, or if the configured controller is disconnected, no traffic is sent, so monitoring will not show any traffic. . -.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR]" +.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR] [\fBwatch:\fR[\fIspec\fR...]]" Connects to \fIswitch\fR and prints to the console all OpenFlow messages received. Usually, \fIswitch\fR should specify the name of a bridge in the \fBovs\-vswitchd\fR database. @@ -295,15 +295,46 @@ does not send these and other asynchronous messages to an specified on this argument. (Thus, if \fImiss\-len\fR is not specified, very little traffic will ordinarily be printed.) .IP -.IP If \fBinvalid_ttl\fR is passed, \fBovs\-ofctl\fR sends an OpenFlow ``set configuration'' message at connection setup time that requests \fBINVALID_TTL_TO_CONTROLLER\fR, so that \fBovs\-ofctl monitor\fR can receive ``packet-in'' messages when TTL reaches zero on \fBdec_ttl\fR action. .IP - +\fBwatch:\fR[\fB\fIspec\fR...] causes \fBovs\-ofctl\fR to send a +``monitor request'' Nicira extension message to the switch at +connection setup time. This message causes the switch to send +information about flow table changes as they occur. The following +comma-separated \fIspec\fR syntax is available: +.RS +.IP "\fB!initial\fR" +Do not report the switch's initial flow table contents. +.IP "\fB!add\fR" +Do not report newly added flows. +.IP "\fB!delete\fR" +Do not report deleted flows. +.IP "\fB!modify\fR" +Do not report modifications to existing flows. +.IP "\fB!own\fR" +Abbreviate changes made to the flow table by \fBovs\-ofctl\fR's own +connection to the switch. (These could only occur using the +\fBofctl/send\fR command described below under \fBRUNTIME MANAGEMENT +COMMANDS\fR.) +.IP "\fB!actions\fR" +Do not report actions as part of flow updates. +.IP "\fBtable=\fInumber\fR" +Limits the monitoring to the table with the given \fInumber\fR between +0 and 254. By default, all tables are monitored. +.IP "\fBout_port=\fIport\fR" +If set, only flows that output to \fIport\fR are monitored. +.IP "\fIfield\fB=\fIvalue\fR" +Monitors only flows that have \fIfield\fR specified as the given +\fIvalue\fR. Any syntax valid for matching on \fBdump\-flows\fR may +be used. +.RE +.IP This command may be useful for debugging switch or controller -implementations. +implementations. With \fBwatch:\fR, it is particularly useful for +observing how a controller updates flow tables. . .SS "OpenFlow Switch and Controller Commands" . diff --git a/utilities/ovs-ofctl.c b/utilities/ovs-ofctl.c index 1c75f464..d633d1c6 100644 --- a/utilities/ovs-ofctl.c +++ b/utilities/ovs-ofctl.c @@ -15,6 +15,7 @@ */ #include +#include #include #include #include @@ -278,7 +279,7 @@ usage(void) " diff-flows SOURCE1 SOURCE2 compare flows from two sources\n" " packet-out SWITCH IN_PORT ACTIONS PACKET...\n" " execute ACTIONS on PACKET\n" - " monitor SWITCH [MISSLEN] [invalid_ttl]\n" + " monitor SWITCH [MISSLEN] [invalid_ttl] [watch:[...]]\n" " print packets received from SWITCH\n" " snoop SWITCH snoop on SWITCH and its controller\n" "\nFor OpenFlow switches and controllers:\n" @@ -1255,10 +1256,57 @@ ofctl_set_output_file(struct unixctl_conn *conn, int argc OVS_UNUSED, unixctl_command_reply(conn, NULL); } +struct block_aux { + struct vconn *vconn; + struct unixctl_server *server; + bool blocked; +}; + +static void +ofctl_block(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, void *block_) +{ + struct block_aux *block = block_; + + if (block->blocked) { + unixctl_command_reply(conn, "already blocking"); + return; + } + + block->blocked = true; + unixctl_command_reply(conn, NULL); + for (;;) { + unixctl_server_run(block->server); + if (!block->blocked) { + break; + } + vconn_run(block->vconn); + + unixctl_server_wait(block->server); + vconn_run_wait(block->vconn); + poll_block(); + } +} + +static void +ofctl_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, void *block_) +{ + struct block_aux *block = block_; + + if (!block->blocked) { + unixctl_command_reply(conn, "not blocking"); + } else { + block->blocked = false; + unixctl_command_reply(conn, NULL); + } +} + static void monitor_vconn(struct vconn *vconn) { struct barrier_aux barrier_aux = { vconn, NULL }; + struct block_aux block; struct unixctl_server *server; bool exiting = false; int error; @@ -1276,6 +1324,13 @@ monitor_vconn(struct vconn *vconn) ofctl_barrier, &barrier_aux); unixctl_command_register("ofctl/set-output-file", "FILE", 1, 1, ofctl_set_output_file, NULL); + + block.vconn = vconn; + block.server = server; + block.blocked = false; + unixctl_command_register("ofctl/block", "", 0, 0, ofctl_block, &block); + unixctl_command_register("ofctl/unblock", "", 0, 0, ofctl_unblock, &block); + daemonize_complete(); for (;;) { @@ -1329,20 +1384,34 @@ static void ofctl_monitor(int argc, char *argv[]) { struct vconn *vconn; + int i; open_vconn(argv[1], &vconn); - if (argc > 2) { - struct ofp_switch_config config; + for (i = 2; i < argc; i++) { + const char *arg = argv[i]; - fetch_switch_config(vconn, &config); - config.miss_send_len = htons(atoi(argv[2])); - set_switch_config(vconn, &config); - } - if (argc > 3) { - if (!strcmp(argv[3], "invalid_ttl")) { + if (isdigit((unsigned char) *arg)) { + struct ofp_switch_config config; + + fetch_switch_config(vconn, &config); + config.miss_send_len = htons(atoi(arg)); + set_switch_config(vconn, &config); + } else if (!strcmp(arg, "invalid_ttl")) { monitor_set_invalid_ttl_to_controller(vconn); + } else if (!strncmp(arg, "watch:", 6)) { + struct ofputil_flow_monitor_request fmr; + struct ofpbuf *msg; + + parse_flow_monitor_request(&fmr, arg + 6); + + msg = ofpbuf_new(0); + ofputil_append_flow_monitor_request(&fmr, msg); + dump_stats_transaction__(vconn, msg); + } else { + ovs_fatal(0, "%s: unsupported \"monitor\" argument", arg); } } + if (preferred_packet_in_format >= 0) { set_packet_in_format(vconn, preferred_packet_in_format); } else {